Apache Kafka 是一個分散式事件流平台,提供高效的消息發布與消息訂閱服務,具有高吞吐、可擴展、永久存儲及高可用性等特點,被廣泛地應用在大數據的 data pipeline 當中。
圖片來源: Kafka Broker, Kafka Topic, Consumer and Record Flow in Kafka
兩者的用途不同,但能夠很好的配合使用,Flink 可以做為 Consumer 訂閱 Kafka 主題,同時也可做為 Producer 將結果發佈到 Kafka 主題上。
至官網下載二進制檔或使用下列指令下載 (本次使用的版本是 3.5.0):
安裝
$ mkdir kafka
$ cd kafka
$ wget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
$ tar -zxvf kafka_2.13-3.5.0.tgz
$ rm kafka_2.13-3.5.0.tgz
$ cd kafka_2.13-3.5.0
設置環境變數
~/.bashrc
$ vim ~/.bashrc
~/.bashrc
export KAFKA_HOME=~/kafka/kafka_2.13-3.5.0
~/.bashrc
$ source ~/.bashrc
Apache Kafka can be started using ZooKeeper or KRaft.
ZooKeeper 是一個分散式協調服務,可以用來分享配置信息、元數據等,在較早的版本中,Kafka 必須透過 ZooKeeper 來協調管理各個主分區的元數據與狀態。
$ $KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
$ $KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
Kafka 在 2.8.0 後提出了內建的共識機制 KRaft,目的是移除 對於 ZooKeeper 的依賴,具體機制可參考:脱离ZooKeeper依赖的Kafka Controller Quorum(KRaft)机制浅析
$ KAFKA_CLUSTER_ID="$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)"
$ $KAFKA_HOME/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c $KAFKA_HOME/config/kraft/server.properties
$ $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties
$ $KAFKA_HOME/bin/kafka-topics.sh --create --topic <topic_name> --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
$ $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
$ $ $KAFKA_HOME/bin/kafka-topics.sh --delete --topic <topic_name> --bootstrap-server loca
lhost:9092
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic <topic_name> --bootstrap-server localhost:9092
$ $KAFKA_HOME/bin/kafka-console-consumer.sh --topic <topic_name> --bootstrap-server localhost:9092
明天會介紹如何在 Flink 中使用 Apache Kafka Connector 來收發數據流。